<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->

<html lang="en">
<head>
    <title>Edgent Source Streams</title>
</head>
<body>
<div role="main" aria-label="Source streams">
<H1>Edgent Source Streams</H1>
<h2>Introduction</h2>
One of the first things you probably want to do is to bring data
into your Edgent applications.
The task is to create a <em>source stream</em> that contains the data
that is to be processed and analyzed by Edgent.
<br>
Edgent provides a number of connectors providing source streams, such as IoTF, MQTT, Kafka, Files, HTTP etc,
however for a specific device or application you may need to develop your own source streams.
<h2>Source Streams</h2>
To simplify the discussion we will describe these in terms of reading sensors,
though each approach can apply to non-sensor data, such as polling an HTTP server for information.
<P>
Thus the goal is to produce a <em>source stream</em> where each tuple on the stream represents
a reading from a sensor (or multiple sensors if the tuple object contains a sensor identifier).
</P>
<P>
Source streams are created using functions and there are three styles of bringing data into Edgent.
<OL>
<LI> <a href="#polling">Polling</a> - periodically polling of a sensor's value. </LI>
<LI> <a href="#blocking">Blocking</a> - a request is made to fetch data from a sensor </LI>
<LI> <a href="#events">Events</a> - an event is created (by a non-Edgent framework) when the sensor changes </LI>
</OL>
<h3 id="polling">Polling Sources</h3>
To poll a sensor periodically an application provides a <b>non-blocking</b>
<a href="../../function/Supplier.html">Supplier function</a> that reads the sensor's value
and passes it to <a href="../Topology.html#poll-org.apache.edgent.function.Supplier-long-java.util.concurrent.TimeUnit-">Topology.poll()</a>.
<P>
For example imagine a library provides a static method to read an engine's oil temperature.
<pre>
<code>
    double oilTemp = EngineInfo.getOilTemperature();
</code>
</pre>
<br>
To declare a stream in a topology that will result in the sensor being read every 100ms:
an application uses a lambda expression as the function and passes it to <code>poll()</code>:
<pre>
<code>
    TStream&lt;Double> oilTemps = topology.poll(() -> EngineInfo.getOilTemperature(), 100, TimeUnit.MILLISECONDS);
</code>
</pre>
At runtime this results in a stream containing a new tuple approximately every 100ms
set to the current value of the oil temperature (as a Double object).  
<BR>
A JSON source stream can be created that contains the sensor identifier in addition to the sensor reading.
This allows multiple sensors to be present on the same stream (for example by merging a stream containing
oil temperatures every 100ms with one containing coolant temperature polled every ten seconds), for partitioned
downstream processing.
<br>
Here's an example of representing the same oil temperature as a JSON object:
<pre>
<code>
    TStream&lt;JsonObject> oilTemps = topology.poll(() ->
       {
          JsonObject j = new JsonObject();
          j.addProperty("id", "oilTemp");
          j.addProperty("value", EngineInfo.getOilTemperature());
          return j;
       }, 100, TimeUnit.MILLISECONDS);
</code>
</pre>
<h3 id="blocking">Blocking Sources</h3>
Some sensors may be of the blocking style, a method is called that blocks until a new reading is available.
Similar to the polling style an application provides a
<a href="../../function/Supplier.html">Supplier function</a> that reads the sensor's value. The difference
here is that the method can block waiting for a reading to become available. When a reading is available the method
returns and the value is put on the returned stream.
<P>
The function is passed to
<a href="../Topology.html#generate-org.apache.edgent.function.Supplier-">Topology.generate()</a>
which at runtime will result in a dedicated thread that loops calling <code>get()</code>.
Thus every time the function has a reading available it returns it which places the returned value
on the stream and then is called again to wait for the next available reading.
<br>
With this example the function is similar to the polling example, the only difference is the
call to get the sensor reading blocks :
<pre>
<code>
    TStream&lt;JsonObject> drillDepths = topology.generate(() ->
       {
          JsonObject j = new JsonObject();
          j.addProperty("id", "drillDepth");
          // blocks until the drill has completed a
          // requested move and returns its current depth.
          j.addProperty("value", Drill.getDepth());
          return j;
       });
</code>
</pre>
</P>
<P>
<a href="../Topology.html#source-org.apache.edgent.function.Supplier-">Topology.source()</a> is another method
that supports a blocking source. This time the <code>Supplier</code> function passed in is called once
to fetch the object that will be iterated over. The iterable object may be finite or infinite. In either
case the <code>next()</code> method may block waiting to obtain the next value to place on the stream.
</P>
<h3 id="events">Event Sources</h3>
<P>
This is a typical style for sensors where a framework exists such that an application can register interest
in receiving events or notifications when a sensor changes its value or state. This is typically performed
by the application registering a callback or listener object that is called by the framework when the sensor changes.
</P>
<a href="../Topology.html#events-org.apache.edgent.function.Consumer-">Topology.events()</a> is the method to
create a source stream from events using a listener function. The Edgent application passes
a <a href="../../function/Consumer.html">Consumer function</a> that will be called when the
application starts.
<BR>
The application's consumer function is passed a reference to a different <code>eventSubmitter</code>
function provided by Edgent, this function (also a <code>Consumer</code>) is used by the callback
to place tuples onto the stream (it <em>submits the event to the stream</em>).
<BR>
Thus the application's function has to:
<UL>
<LI>Register a callback with the sensor framework that will be called when the sensor changes. </LI>
<LI>Have the callback convert the sensor change event information into the tuple type it wants to send on the stream,
for example a JSON object. In some cases the raw event object is used as the stream's tuple. </LI>
<LI>Execute the function provided by Edgent calling <code>eventSubmitter.accept(tuple)</code> to place the
tuple onto the stream.
</UL>
<P>
Here's an example of creating a stream of sensor events in Android.
<BR>
<pre>
<code>
   SensorManager mSensorManager = (SensorManager) getSystemService(Context.SENSOR_SERVICE);

   // Passes a Consumer that registers a SensorChangeEvents that 
   // puts the SensorEvent onto the stream using eventSubmitter
   // supplied by Edgent.
   
   // Note for clarity the application function is implemented as a lambda expression

   TStream&lt;SensorEventt> tempSensor = topology.events(eventSubmitter ->
      mSensorManager.registerListener(
           new SensorChangeEvents(eventSubmitter),
           Sensor.TYPE_AMBIENT_TEMPERATURE,
           SensorManager.SENSOR_DELAY_NORMAL);

// ....

/**
 * Sensor event listener that submits sensor
 * change events as tuples using a Consumer.
 */
public class SensorChangeEvents implements SensorEventListener {
    private final Consumer&lt;SensorEvent> eventSubmitter;
    
    public SensorChangeEvents(Consumer&lt;SensorEvent> eventSubmitter) {
        this.eventSubmitter = eventSubmitter;
    }
    @Override
    public void onSensorChanged(SensorEvent event) {
        // Submit the event directly to the stream
        eventSubmitter.accept(event);
    }

    @Override
    public void onAccuracyChanged(Sensor sensor, int accuracy) {
    }
}

</code>
</pre> 
</P>
</div>
</body>
</html>
